// Copyright (C) INFINI Labs & INFINI LIMITED.
//
// The INFINI Console is offered under the GNU Affero General Public License v3.0
// and as commercial software.
//
// For commercial licensing, contact us at:
//   - Website: infinilabs.com
//   - Email: hello@infini.ltd
//
// Open Source licensed under AGPL V3:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

/* Copyright © INFINI Ltd. All rights reserved.
 * web: https://infinilabs.com
 * mail: hello#infini.ltd */

package alerting

import (
	"fmt"
	"github.com/buger/jsonparser"
	log "github.com/cihub/seelog"
	"infini.sh/console/model/alerting"
	alerting2 "infini.sh/console/service/alerting"
	httprouter "infini.sh/framework/core/api/router"
	"infini.sh/framework/core/elastic"
	"infini.sh/framework/core/global"
	"infini.sh/framework/core/kv"
	"infini.sh/framework/core/orm"
	"infini.sh/framework/core/util"
	"net/http"
	"strconv"
	"strings"
	"time"
)

func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
	body := struct {
		Messages      []alerting.AlertMessage `json:"messages"`
		IgnoredReason string                  `json:"ignored_reason"`
		IsReset       bool                    `json:"is_reset"`
	}{}
	err := h.DecodeJSON(req, &body)
	if err != nil {
		h.WriteError(w, err.Error(), http.StatusInternalServerError)
		return
	}

	if len(body.Messages) == 0 {
		h.WriteError(w, "messages should not be empty", http.StatusInternalServerError)
		return
	}
	messageIDs := make([]string, 0, len(body.Messages))
	for _, msg := range body.Messages {
		messageIDs = append(messageIDs, msg.ID)
	}
	currentUser := h.GetCurrentUser(req)
	must := []util.MapStr{
		{
			"terms": util.MapStr{
				"_id": messageIDs,
			},
		},
	}
	var source string
	if body.IsReset {
		must = append(must, util.MapStr{
			"term": util.MapStr{
				"status": util.MapStr{
					"value": alerting.MessageStateIgnored,
				},
			},
		})
		source = fmt.Sprintf("ctx._source['status'] = '%s'", alerting.MessageStateAlerting)
	} else {
		must = append(must, util.MapStr{
			"term": util.MapStr{
				"status": util.MapStr{
					"value": alerting.MessageStateAlerting,
				},
			},
		})
		source = fmt.Sprintf("ctx._source['status'] = '%s';ctx._source['ignored_time']='%s';ctx._source['ignored_reason']='%s';ctx._source['ignored_user']='%s'", alerting.MessageStateIgnored, time.Now().Format(time.RFC3339Nano), body.IgnoredReason, currentUser)
	}
	queryDsl := util.MapStr{
		"query": util.MapStr{
			"bool": util.MapStr{
				"must": must,
			},
		},
		"script": util.MapStr{
			"source": source,
		},
	}
	err = orm.UpdateBy(alerting.AlertMessage{}, util.MustToJSONBytes(queryDsl))
	if err != nil {
		h.WriteError(w, err.Error(), http.StatusInternalServerError)
		log.Error(err)
		return
	}
	//delete kv cache
	for _, msg := range body.Messages {
		_ = kv.DeleteKey(alerting2.KVLastMessageState, []byte(msg.RuleID))
	}

	h.WriteJSON(w, util.MapStr{
		"ids":    messageIDs,
		"result": "updated",
	}, 200)
}

func (h *AlertAPI) getAlertMessageStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
	esClient := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID))
	must := []util.MapStr{
		{
			"terms": util.MapStr{
				"status": []string{
					alerting.MessageStateAlerting,
				},
			},
		},
	}
	clusterFilter, hasAllPrivilege := h.GetClusterFilter(req, "resource_id")
	if !hasAllPrivilege && clusterFilter == nil {
		h.WriteJSON(w, elastic.SearchResponse{}, http.StatusOK)
		return
	}
	if !hasAllPrivilege {
		must = append(must, clusterFilter)
	}
	queryDsl := util.MapStr{
		"size": 0,
		"query": util.MapStr{
			"bool": util.MapStr{
				"must": must,
			},
		},
		"aggs": util.MapStr{
			"terms_by_priority": util.MapStr{
				"terms": util.MapStr{
					"field": "priority",
					"size":  5,
				},
			},
		},
	}
	indexName := orm.GetWildcardIndexName(alerting.AlertMessage{})
	searchRes, err := esClient.SearchWithRawQueryDSL(indexName, util.MustToJSONBytes(queryDsl))
	if err != nil {
		h.WriteJSON(w, util.MapStr{
			"error": err.Error(),
		}, http.StatusInternalServerError)
		return
	}
	statusCounts := map[string]interface{}{}
	if termsAgg, ok := searchRes.Aggregations["terms_by_priority"]; ok {
		for _, bk := range termsAgg.Buckets {
			if status, ok := bk["key"].(string); ok {
				statusCounts[status] = bk["doc_count"]
			}
		}
	}
	for _, status := range []string{"info", "low", "medium", "high", "critical"} {
		if _, ok := statusCounts[status]; !ok {
			statusCounts[status] = 0
		}
	}
	must[0] = util.MapStr{
		"term": util.MapStr{
			"status": util.MapStr{
				"value": alerting.MessageStateIgnored,
			},
		},
	}
	queryDsl = util.MapStr{
		"query": util.MapStr{
			"bool": util.MapStr{
				"must": must,
			},
		},
	}
	countRes, err := esClient.Count(nil, indexName, util.MustToJSONBytes(queryDsl))
	if err != nil {
		h.WriteJSON(w, util.MapStr{
			"error": err.Error(),
		}, http.StatusInternalServerError)
		return
	}
	statusCounts[alerting.MessageStateIgnored] = countRes.Count

	queryDsl = util.MapStr{
		"size": 0,
		"aggs": util.MapStr{
			"terms_by_category": util.MapStr{
				"terms": util.MapStr{
					"field": "category",
					"size":  100,
				},
			},
			"terms_by_tags": util.MapStr{
				"terms": util.MapStr{
					"field": "tags",
					"size":  100,
				},
			},
		},
	}
	searchRes, err = esClient.SearchWithRawQueryDSL(indexName, util.MustToJSONBytes(queryDsl))
	if err != nil {
		h.WriteJSON(w, util.MapStr{
			"error": err.Error(),
		}, http.StatusInternalServerError)
		return
	}
	categories := []string{}
	if termsAgg, ok := searchRes.Aggregations["terms_by_category"]; ok {
		for _, bk := range termsAgg.Buckets {
			if cate, ok := bk["key"].(string); ok {
				categories = append(categories, cate)
			}
		}
	}
	tags := []string{}
	if termsAgg, ok := searchRes.Aggregations["terms_by_tags"]; ok {
		for _, bk := range termsAgg.Buckets {
			if tag, ok := bk["key"].(string); ok {
				tags = append(tags, tag)
			}
		}
	}
	h.WriteJSON(w, util.MapStr{
		"alert": util.MapStr{
			"current": statusCounts,
		},
		"categories": categories,
		"tags":       tags,
	}, http.StatusOK)
}

func (h *AlertAPI) searchAlertMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {

	var (
		queryDSL = `{"sort":[%s],"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d,"aggs": {
    "max_updated": {
      "max": {
        "field": "updated"
      }
    },
    "min_updated": {
      "min": {
        "field": "updated"
      }
    }
  }}`
		strSize     = h.GetParameterOrDefault(req, "size", "20")
		strFrom     = h.GetParameterOrDefault(req, "from", "0")
		status      = h.GetParameterOrDefault(req, "status", "")
		priority    = h.GetParameterOrDefault(req, "priority", "")
		sort        = h.GetParameterOrDefault(req, "sort", "")
		ruleID      = h.GetParameterOrDefault(req, "rule_id", "")
		min         = h.GetParameterOrDefault(req, "min", "")
		max         = h.GetParameterOrDefault(req, "max", "")
		mustBuilder = &strings.Builder{}
		sortBuilder = strings.Builder{}
		category    = h.GetParameterOrDefault(req, "category", "")
		tags        = h.GetParameterOrDefault(req, "tags", "")
	)
	timeRange := util.MapStr{}
	if min != "" {
		timeRange["gte"] = min
	}
	if max != "" {
		timeRange["lte"] = max
	}
	if len(timeRange) > 0 {
		timeFilter := util.MapStr{
			"range": util.MapStr{
				"updated": timeRange,
			},
		}
		mustBuilder.Write(util.MustToJSONBytes(timeFilter))
	} else {
		mustBuilder.WriteString(`{"match_all":{}}`)
	}

	if ruleID != "" {
		mustBuilder.WriteString(fmt.Sprintf(`,{"term":{"rule_id":{"value":"%s"}}}`, ruleID))
	}
	clusterFilter, hasAllPrivilege := h.GetClusterFilter(req, "resource_id")
	if !hasAllPrivilege && clusterFilter == nil {
		h.WriteJSON(w, elastic.SearchResponse{}, http.StatusOK)
		return
	}
	if !hasAllPrivilege {
		mustBuilder.WriteString(",")
		mustBuilder.Write(util.MustToJSONBytes(clusterFilter))
	}

	if sort != "" {
		sortParts := strings.Split(sort, ",")
		if len(sortParts) == 2 && sortParts[1] != "updated" {
			sortBuilder.WriteString(fmt.Sprintf(`{"%s":{ "order": "%s"}},`, sortParts[0], sortParts[1]))
		}
	}
	sortBuilder.WriteString(`{"updated":{ "order": "desc"}}`)

	if status != "" {
		mustBuilder.WriteString(",")
		mustBuilder.WriteString(fmt.Sprintf(`{"term":{"status":{"value":"%s"}}}`, status))
	}
	if priority != "" {
		mustBuilder.WriteString(",")
		mustBuilder.WriteString(fmt.Sprintf(`{"term":{"priority":{"value":"%s"}}}`, priority))
	}
	if category != "" {
		mustBuilder.WriteString(",")
		mustBuilder.WriteString(fmt.Sprintf(`{"term":{"category":{"value":"%s"}}}`, category))
	}
	if tags != "" {
		mustBuilder.WriteString(",")
		mustBuilder.WriteString(fmt.Sprintf(`{"term":{"tags":{"value":"%s"}}}`, tags))
	}
	size, _ := strconv.Atoi(strSize)
	if size <= 0 {
		size = 20
	}
	from, _ := strconv.Atoi(strFrom)
	if from < 0 {
		from = 0
	}

	q := orm.Query{}
	queryDSL = fmt.Sprintf(queryDSL, sortBuilder.String(), mustBuilder.String(), size, from)
	q.RawQuery = []byte(queryDSL)

	err, res := orm.Search(&alerting.AlertMessage{}, &q)
	if err != nil {
		log.Error(err)
		h.WriteError(w, err.Error(), http.StatusInternalServerError)
		return
	}
	esRes := elastic.SearchResponse{}
	err = util.FromJSONBytes(res.Raw, &esRes)
	if err != nil {
		log.Error(err)
		h.WriteError(w, err.Error(), http.StatusInternalServerError)
		return
	}
	for _, hit := range esRes.Hits.Hits {
		created, _ := parseTime(hit.Source["created"], time.RFC3339)
		updated, _ := parseTime(hit.Source["updated"], time.RFC3339)
		if !created.IsZero() && !updated.IsZero() {
			endTime := time.Now()
			if hit.Source["status"] == alerting.MessageStateRecovered {
				endTime = updated
			}
			hit.Source["duration"] = endTime.Sub(created).Milliseconds()
		}

	}
	h.WriteJSON(w, esRes, http.StatusOK)
}

func parseTime(t interface{}, layout string) (time.Time, error) {
	switch t.(type) {
	case string:
		return time.Parse(layout, t.(string))
	default:
		return time.Time{}, fmt.Errorf("unsupport time type")
	}
}

func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
	message := &alerting.AlertMessage{
		ID: ps.ByName("message_id"),
	}
	exists, err := orm.Get(message)
	if !exists || err != nil {
		log.Error(err)
		h.WriteJSON(w, util.MapStr{
			"_id":   message.ID,
			"found": false,
		}, http.StatusNotFound)
		return
	}
	rule := &alerting.Rule{
		ID: message.RuleID,
	}
	exists, err = orm.Get(rule)
	if !exists || err != nil {
		log.Error(err)
		h.WriteError(w, fmt.Sprintf("rule [%s] not found", rule.ID), http.StatusInternalServerError)
		return
	}
	metricExpression, _ := rule.Metrics.GenerateExpression()
	var (
		hitCondition   string
		bucketDiffType string
	)
	conditions := rule.Conditions
	if rule.BucketConditions != nil {
		conditions = *rule.BucketConditions
	}
	for i, cond := range conditions.Items {
		expression, _ := cond.GenerateConditionExpression()
		if cond.Priority == message.Priority {
			hitCondition = strings.ReplaceAll(expression, "result", "")
			if rule.BucketConditions != nil {
				bucketDiffType = string(cond.Type)
			}
		}
		conditions.Items[i].Expression = strings.ReplaceAll(expression, "result", metricExpression)
	}
	var duration time.Duration
	if message.Status == alerting.MessageStateRecovered {
		duration = message.Updated.Sub(message.Created)
	} else {
		duration = time.Now().Sub(message.Created)
	}
	detailObj := util.MapStr{
		"message_id":        message.ID,
		"rule_id":           message.RuleID,
		"rule_name":         rule.Name,
		"rule_enabled":      rule.Enabled,
		"title":             message.Title,
		"message":           message.Message,
		"priority":          message.Priority,
		"created":           message.Created,
		"updated":           message.Updated,
		"resource_name":     rule.Resource.Name,
		"resource_id":       rule.Resource.ID,
		"resource_objects":  rule.Resource.Objects,
		"conditions":        rule.Conditions,
		"bucket_conditions": rule.BucketConditions,
		"bucket_diff_type":  bucketDiffType,
		"duration":          duration.Milliseconds(),
		"ignored_time":      message.IgnoredTime,
		"ignored_reason":    message.IgnoredReason,
		"ignored_user":      message.IgnoredUser,
		"status":            message.Status,
		"expression":        rule.Metrics.Expression,
		"hit_condition":     hitCondition,
	}
	h.WriteJSON(w, detailObj, http.StatusOK)
}

func (h *AlertAPI) getMessageNotificationInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
	message := &alerting.AlertMessage{
		ID: ps.ByName("message_id"),
	}
	exists, err := orm.Get(message)
	if !exists || err != nil {
		log.Error(err)
		h.WriteJSON(w, util.MapStr{
			"_id":   message.ID,
			"found": false,
		}, http.StatusNotFound)
		return
	}
	rule := &alerting.Rule{
		ID: message.RuleID,
	}
	exists, err = orm.Get(rule)
	if !exists || err != nil {
		log.Error(err)
		h.WriteError(w, fmt.Sprintf("rule [%s] not found", rule.ID), http.StatusInternalServerError)
		return
	}
	notificationInfo := util.MapStr{}
	if rule.NotificationConfig == nil && rule.RecoveryNotificationConfig == nil {
		notificationInfo["is_empty"] = true
		h.WriteJSON(w, notificationInfo, http.StatusOK)
		return
	}
	stats, err := getMessageNotificationStats(message)
	if err != nil {
		log.Error(err)
		h.WriteError(w, err.Error(), http.StatusInternalServerError)
		return
	}
	if rule.NotificationConfig != nil {
		notificationInfo["alerting"] = util.MapStr{
			"accept_time_range":          rule.NotificationConfig.AcceptTimeRange,
			"throttle_period":            rule.NotificationConfig.ThrottlePeriod,
			"escalation_enabled":         rule.NotificationConfig.EscalationEnabled,
			"escalation_throttle_period": rule.NotificationConfig.EscalationThrottlePeriod,
			"normal_stats":               stats["normal"],
			"escalation_stats":           stats["escalation"],
		}
	}
	if rule.RecoveryNotificationConfig != nil {
		notificationInfo["recovery"] = util.MapStr{
			"stats": stats["recovery"],
		}
	}
	h.WriteJSON(w, notificationInfo, http.StatusOK)
}

func getMessageNotificationStats(msg *alerting.AlertMessage) (util.MapStr, error) {
	rangeQ := util.MapStr{
		"gte": msg.Created.UnixMilli(),
	}
	if msg.Status == alerting.MessageStateRecovered {
		rangeQ["lte"] = msg.Updated.UnixMilli()
	}
	aggs := util.MapStr{
		"grp_normal_channel": util.MapStr{
			"terms": util.MapStr{
				"field": "action_execution_results.channel_type",
				"size":  20,
			},
			"aggs": util.MapStr{
				"top": util.MapStr{
					"top_hits": util.MapStr{
						"sort": []util.MapStr{
							{
								"created": util.MapStr{
									"order": "desc",
								},
							},
						},
						"_source": util.MapStr{
							"includes": []string{"created", "action_execution_results.channel_name", "action_execution_results.channel_type", "action_execution_results.error"},
						},
						"size": 1,
					},
				},
			},
		},
		"grp_escalation_channel": util.MapStr{
			"terms": util.MapStr{
				"field": "escalation_action_results.channel_type",
				"size":  20,
			},
			"aggs": util.MapStr{
				"top": util.MapStr{
					"top_hits": util.MapStr{
						"sort": []util.MapStr{
							{
								"created": util.MapStr{
									"order": "desc",
								},
							},
						},
						"_source": util.MapStr{
							"includes": []string{"created", "escalation_action_results.channel_name", "escalation_action_results.channel_type", "escalation_action_results.error"},
						},
						"size": 1,
					},
				},
			},
		},
	}
	if msg.Status == alerting.MessageStateRecovered {
		aggs["grp_recover_channel"] = util.MapStr{
			"terms": util.MapStr{
				"field": "recover_action_results.channel_type",
				"size":  20,
			},
			"aggs": util.MapStr{
				"top": util.MapStr{
					"top_hits": util.MapStr{
						"sort": []util.MapStr{
							{
								"created": util.MapStr{
									"order": "desc",
								},
							},
						},
						"_source": util.MapStr{
							"includes": []string{"created", "recover_action_results.channel_name", "recover_action_results.channel_type", "recover_action_results.error"},
						},
						"size": 1,
					},
				},
			},
		}
	}
	query := util.MapStr{
		"size": 0,
		"aggs": aggs,
		"query": util.MapStr{
			"bool": util.MapStr{
				"must": []util.MapStr{
					{
						"range": util.MapStr{
							"created": rangeQ,
						},
					},
					{
						"term": util.MapStr{
							"rule_id": util.MapStr{
								"value": msg.RuleID,
							},
						},
					},
				},
			},
		},
	}
	q := orm.Query{
		RawQuery: util.MustToJSONBytes(query),
	}
	err, result := orm.Search(alerting.Alert{}, &q)
	if err != nil {
		return nil, err
	}

	var normalStats = extractStatsFromRaw(result.Raw, "grp_normal_channel", "action_execution_results")
	var escalationStats = extractStatsFromRaw(result.Raw, "grp_escalation_channel", "escalation_action_results")
	stats := util.MapStr{
		"normal":     normalStats,
		"escalation": escalationStats,
	}
	if msg.Status == alerting.MessageStateRecovered {
		recoverStats := extractStatsFromRaw(result.Raw, "grp_recover_channel", "recover_action_results")
		stats["recovery"] = recoverStats
	}

	return stats, nil
}
func extractStatsFromRaw(searchRawRes []byte, grpKey string, actionKey string) []util.MapStr {
	var stats []util.MapStr
	jsonparser.ArrayEach(searchRawRes, func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
		statsItem := util.MapStr{}
		statsItem["channel_type"], _ = jsonparser.GetString(value, "key")
		statsItem["count"], _ = jsonparser.GetInt(value, "doc_count")
		jsonparser.ArrayEach(value, func(v []byte, dataType jsonparser.ValueType, offset int, err error) {
			ck, _ := jsonparser.GetString(v, "channel_type")
			cn, _ := jsonparser.GetString(v, "channel_name")
			if ck == statsItem["channel_type"] {
				statsItem["channel_name"] = cn
				statsItem["error"], _ = jsonparser.GetString(v, "error")
			}
		}, "top", "hits", "hits", "[0]", "_source", actionKey)
		statsItem["last_time"], _ = jsonparser.GetString(value, "top", "hits", "hits", "[0]", "_source", "created")
		stats = append(stats, statsItem)
	}, "aggregations", grpKey, "buckets")
	return stats
}
